package com.cx.base.day25_Rocketmq.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class DelayConsumer {
    public static void main(String[] args) {
        consumer();
    }

    public static void consumer() {
        //        1 创建消费者Consumer ，并确定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//        2 指定NameServer地址
        consumer.setNamesrvAddr("192.168.48.128:9876;192.168.48.137:9876");
//        consumer.setNamesrvAddr("106.52.131.197:9876;47.96.165.93:9876");
        try {
//            3 订阅主题Topic和Tag
            consumer.subscribe("delayTopic", "*");
//        4 设置回调函数，处理消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    for (MessageExt msg : list) {
                        System.out.println(System.currentTimeMillis() + ":" + Thread.currentThread().getName() + ",延迟时间：" + (System.currentTimeMillis() - msg.getStoreTimestamp()));
                        try {
                            Thread.sleep(100000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
//        5 启动消息内容
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
}
